-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix memory memory management for flatMap #376
base: master
Are you sure you want to change the base?
Conversation
prevent the flatMap operator from accumulating in an internal array while concurrency is maxed out
// add the outer iterator to the race | ||
results[0] = outer.next(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is safe, since here we are still potentially waiting on the latest promise from the outer iterator to resolve.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you mean. I'll see if I can't figure out a better way of doing it without potentially loosing values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @trxcllnt thanks for pointing this out, 8f90bc7 resolves the tests locally for me, as you Identified some were failing because I was loosing items due to the above calling next when a previous item still remained unresolved and other tests were failing because next was called on the outer after it had been completed above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests currently failing, possibly due to L124-L125 changes. Let me know if you'd like help running or debugging the tests.
results[0] = outer.next(); | ||
} else { | ||
// remove the outer iterator from the race, we're full | ||
results[0] = NEVER_PROMISE; | ||
outerValues.push(value as TSource); | ||
} | ||
results[0] = outer.next(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for calling this out.
This operator is technically working as designed, i.e. we are intentionally pulling and buffering values from the outer sequence as quickly as they're emitted, but only flattening concurrent
number of inner sequences at a time (similar to the Observable implementation).
But I can see how the internal buffering is problematic, and I agree that we shouldn't need to buffer the outer sequence values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @trxcllnt, I manged to get the tests running locally, thanks for your help identifying the problems with my code.
When I started looking at the actual code I figured it must have been by design.
My intent was to fan out to do multiple API requests ( to AWS SQS) concurrently based on the result of of a dynamodb scan and the speed that I could processes the items is reduced significantly when I removed the concurrency and tried a concatMap
( although this technically works, and solves my memory issues, although a little slower for my application).
My initial motivation for moving from RX to IX was to fix this issue, figuring it would be difficult to prevent my memory issues unless I had a way of throttling the producer, I assume another approach that would work as the library currently stands is to use a throttle
before the flatMap
and attempt to tweak that time so the consumer can keep up, and the buffer never explodes.
On the tests, I managed to run the unit tests although the contribution guide mentions performance tests but I seem unable to locate these?
* loosing values due to using .next while still waiting for outer iterator * adding completed iterator to race resulting in exception
@thepont Apologies for the delay, I somehow lost track of this PR. I'll try to get this merged and publish a new version later today. Thanks for understanding! |
Description:
This change prevents us from requesting a value from the outer iterator if we are currently maxed out for concurrency on our inner iterators, This prevents a potential OOM exception where the inner iterators process at a slower rate the the outer iterator can produce.
Related issue (if exists): #375